Skip to content

Fix a flaky test in ApproximateQuantilesTest#39132

Merged
shunping merged 1 commit into
apache:masterfrom
shunping:fix-flaky-quantile-test
Jun 27, 2026
Merged

Fix a flaky test in ApproximateQuantilesTest#39132
shunping merged 1 commit into
apache:masterfrom
shunping:fix-flaky-quantile-test

Conversation

@shunping

@shunping shunping commented Jun 27, 2026

Copy link
Copy Markdown
Collaborator

In test_batched_quantiles, when configuring ApproximateQuantiles.Globally with key=sum, multiple input tuples (specifically (72.5, 225) and (22.5, 275)) evaluate to the exact same key value (297.5).

Because sum is used as the sole comparison key, the ordering between elements with identical sums is non-deterministic and sensitive to bundle execution/merging order as well as shared class-level jitter state. This leads to flaky test assertions when asserting the exact elements in the computed quantiles.

Failed test example:
https://github.com/apache/beam/actions/runs/28270530208/job/83766814632?pr=39130

Traceback:

_______________ ApproximateQuantilesTest.test_batched_quantiles ________________
[gw3] linux -- Python 3.10.20 /runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloudcoverage/py310-cloudcoverage/bin/python

self = <apache_beam.transforms.stats_test.ApproximateQuantilesTest testMethod=test_batched_quantiles>

    def test_batched_quantiles(self):
>     with TestPipeline() as p:

apache_beam/transforms/stats_test.py:482: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:652: in __exit__
    self.result = self.run()
apache_beam/testing/test_pipeline.py:123: in run
    state = result.wait_until_finish(duration=self.timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7bd4656ec820>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      last_error_text = None
    
      def read_messages() -> None:
        nonlocal last_error_text
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            mr = message.message_response
            logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", mr.message_text)
            if mr.importance == beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR:
              last_error_text = mr.message_text
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
E           self._write_value_to_tag(tag, windowed_value, watermark_estimator)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1797, in _write_value_to_tag
E           self.main_receivers.receive(windowed_value)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py", line 264, in receive
E           self.consumer.process(windowed_value)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py", line 956, in process
E           delayed_applications = self.dofn_runner.process(o)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1501, in process
E           self._reraise_augmented(exn, windowed_value)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1589, in _reraise_augmented
E           raise exn
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1499, in process
E           return self.do_fn_invoker.invoke_process(windowed_value)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 685, in invoke_process
E           self.output_handler.handle_process_outputs(
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1684, in handle_process_outputs
E           self._write_value_to_tag(tag, windowed_value, watermark_estimator)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1797, in _write_value_to_tag
E           self.main_receivers.receive(windowed_value)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py", line 264, in receive
E           self.consumer.process(windowed_value)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py", line 956, in process
E           delayed_applications = self.dofn_runner.process(o)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1501, in process
E           self._reraise_augmented(exn, windowed_value)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1589, in _reraise_augmented
E           raise exn
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1499, in process
E           return self.do_fn_invoker.invoke_process(windowed_value)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 685, in invoke_process
E           self.output_handler.handle_process_outputs(
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1684, in handle_process_outputs
E           self._write_value_to_tag(tag, windowed_value, watermark_estimator)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1797, in _write_value_to_tag
E           self.main_receivers.receive(windowed_value)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py", line 264, in receive
E           self.consumer.process(windowed_value)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py", line 956, in process
E           delayed_applications = self.dofn_runner.process(o)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1501, in process
E           self._reraise_augmented(exn, windowed_value)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1610, in _reraise_augmented
E           raise new_exn
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1499, in process
E           return self.do_fn_invoker.invoke_process(windowed_value)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 913, in invoke_process
E           self._invoke_process_per_window(
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1058, in _invoke_process_per_window
E           self.process_method(*args_for_process, **kwargs_for_process),
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/transforms/core.py", line 2126, in <lambda>
E           wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/testing/util.py", line 202, in _equal
E           raise BeamAssertException(msg)
E       apache_beam.testing.util.BeamAssertException: Failed assert: [[(99.9, 499), (72.5, 225), (50.0, 0)]] == [[(99.9, 499), (22.5, 275), (50.0, 0)]], unexpected elements [[(99.9, 499), (22.5, 275), (50.0, 0)]], missing elements [[(99.9, 499), (72.5, 225), (50.0, 0)]] [while running 'checkGloballyWithKeyAndReversed/Match']

apache_beam/runners/portability/portable_runner.py:572: RuntimeError

In this PR, we introduced a secondary sorting key via a static helper method _sum_and_second(x). This ensures that if the sums of two elements are identical, they are compared using their second element, making the comparison completely deterministic.

@shunping

Copy link
Copy Markdown
Collaborator Author

r: @derrickaw

@shunping shunping marked this pull request as ready for review June 27, 2026 00:18
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses flakiness in the ApproximateQuantilesTest suite. The issue stemmed from non-deterministic ordering when multiple input tuples resulted in identical key values, causing test assertions to fail intermittently based on execution order. By implementing a secondary sorting key, the comparison logic is now fully deterministic, ensuring consistent test results.

Highlights

  • Deterministic Tie-Breaking: Introduced a new static helper method _sum_and_second to provide a secondary sorting key, ensuring deterministic behavior when elements have identical sums.
  • Test Stability: Updated ApproximateQuantilesTest.test_batched_quantiles to use the new helper method, effectively resolving flaky test failures caused by non-deterministic ordering during bundle merging.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions

Copy link
Copy Markdown
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a deterministic tie-breaker helper method, _sum_and_second, to prevent flaky test failures in test_batched_quantiles when using key=sum. The feedback suggests optimizing this helper function by replacing sum(x) with x[0] + x[1] to avoid the overhead of iterating over the 2-tuple.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread sdks/python/apache_beam/transforms/stats_test.py
@codecov

codecov Bot commented Jun 27, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 58.67%. Comparing base (eef26ca) to head (98bac64).
⚠️ Report is 7 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #39132      +/-   ##
============================================
+ Coverage     58.62%   58.67%   +0.05%     
  Complexity    15246    15246              
============================================
  Files          2769     2770       +1     
  Lines        275617   276049     +432     
  Branches      12163    12163              
============================================
+ Hits         161577   161976     +399     
- Misses       107621   107654      +33     
  Partials       6419     6419              
Flag Coverage Δ
python 79.36% <ø> (+0.07%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@shunping

Copy link
Copy Markdown
Collaborator Author

Thanks!

@shunping shunping merged commit 5a2e98b into apache:master Jun 27, 2026
107 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants